package com.etc.spark

import com.etc.dao.CourseClickCountDAO
import com.etc.domain.{ClickLog, CourseClickCount}
import com.etc.utils.DateUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer

/**
  * 从kafka获取数据然后
  * 进行数据清理之后储存到hbase
  * @author l
  */

object FlumeKafka {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(120))
    val  zkQuorum = "192.168.88.11:2181,192.168.88.12:2181,192.168.88.13:2181"
    //组id同一个组可以共享
    val groupId = "g1"
    val topic = Map[String,Int]("kk8"->1)

    //创建DStream, 需要kafkaDStream
    val data: ReceiverInputDStream[(String,String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic)
    //对数据进行处理
    //kafka的 ReceiverInputDStream[(String,String)]里面装着一个元组(key是写入的key的值，value是实际的写入的内容)
    val lines: DStream[String] = data.map(_._2)
    //输入数据格式
    //72.156.187.63	2019-04-04 09:55:01	"GET /class/131.html HTTP/1.1 "	-	404

    // 步骤二 ： 数据清理
    val logs = lines.map(line => {
      val infos = line.split("\t")                           //用\t就是为了不把引号里面的空格切开
      var courseId = 0
      if (infos.length == 5) {
        val shizhen = infos(2).split("/")                     //    /class/131.html HTTP/1.1
        //获得实战的课程编号
        if (shizhen(2).contains(".")){
          courseId = shizhen(2).split(".html")(0).toInt
        }
        ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(4).toInt, infos(3))
      }else{
        ClickLog("", "", 0, 0, "")
      }
    }).filter(clicklog => clicklog.courseId != 0)
    logs.print()

//    // 测试步骤三：统计今天到现在为止实战课程的访问量
//
    logs.map(x => {
      // HBase rowkey设计： 20171111_88
      (x.time.substring(0, 8) + "_" + x.courseId, 1)
    }).reduceByKey(_ + _).foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecords => {
        val list = new ListBuffer[CourseClickCount]

        partitionRecords.foreach(pair => {
          list.append(CourseClickCount(pair._1, pair._2))
        })
        CourseClickCountDAO.save(list)
      })
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
